/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.server;

import com.facebook.drift.annotations.ThriftConstructor;
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.dispatcher.DispatchManager;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.memory.ClusterMemoryManager;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.resourcemanager.ResourceManagerProxy;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManager;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.Duration;

import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import java.net.URI;
import java.util.Iterator;
import java.util.Optional;
import java.util.function.Supplier;

import static com.facebook.airlift.http.client.thrift.ThriftRequestUtils.APPLICATION_THRIFT_BINARY;
import static com.facebook.airlift.http.client.thrift.ThriftRequestUtils.APPLICATION_THRIFT_COMPACT;
import static com.facebook.airlift.http.client.thrift.ThriftRequestUtils.APPLICATION_THRIFT_FB_COMPACT;
import static com.facebook.presto.server.security.RoleType.ADMIN;
import static com.facebook.presto.server.security.RoleType.USER;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;

@Path("/v1/cluster")
@RolesAllowed({ADMIN, USER})
public class ClusterStatsResource
{
    private final InternalNodeManager nodeManager;
    private final DispatchManager dispatchManager;
    private final boolean isIncludeCoordinator;
    private final boolean resourceManagerEnabled;
    private final ClusterMemoryManager clusterMemoryManager;
    private final Optional<ResourceManagerProxy> proxyHelper;
    private final InternalResourceGroupManager internalResourceGroupManager;
    private final ClusterTtlProviderManager clusterTtlProviderManager;
    private final Supplier<ClusterStats> clusterStatsSupplier;

    @Inject
    public ClusterStatsResource(
            NodeSchedulerConfig nodeSchedulerConfig,
            ServerConfig serverConfig,
            InternalNodeManager nodeManager,
            DispatchManager dispatchManager,
            ClusterMemoryManager clusterMemoryManager,
            Optional<ResourceManagerProxy> proxyHelper,
            InternalResourceGroupManager internalResourceGroupManager,
            ClusterTtlProviderManager clusterTtlProviderManager)
    {
        this.isIncludeCoordinator = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null").isIncludeCoordinator();
        this.resourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled();
        this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
        this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
        this.clusterMemoryManager = requireNonNull(clusterMemoryManager, "clusterMemoryManager is null");
        this.proxyHelper = requireNonNull(proxyHelper, "internalNodeManager is null");
        this.internalResourceGroupManager = requireNonNull(internalResourceGroupManager, "internalResourceGroupManager is null");
        this.clusterTtlProviderManager = requireNonNull(clusterTtlProviderManager, "clusterTtlProvider is null");
        Duration expirationDuration = requireNonNull(serverConfig, "serverConfig is null").getClusterStatsExpirationDuration();
        this.clusterStatsSupplier = expirationDuration.getValue() > 0 ? memoizeWithExpiration(this::calculateClusterStats, expirationDuration.toMillis(), MILLISECONDS) : this::calculateClusterStats;
    }

    @GET
    @Produces({MediaType.APPLICATION_JSON, APPLICATION_THRIFT_BINARY, APPLICATION_THRIFT_COMPACT, APPLICATION_THRIFT_FB_COMPACT})
    public void getClusterStats(
            @HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
            @Context UriInfo uriInfo,
            @Context HttpServletRequest servletRequest,
            @Suspended AsyncResponse asyncResponse,
            @QueryParam("includeLocalInfoOnly") @DefaultValue("false") boolean includeLocalInfoOnly)
    {
        if (resourceManagerEnabled && !includeLocalInfoOnly) {
            proxyClusterStats(servletRequest, asyncResponse, xForwardedProto, uriInfo);
            return;
        }
        asyncResponse.resume(Response.ok(clusterStatsSupplier.get()).build());
    }

    private ClusterStats calculateClusterStats()
    {
        long runningQueries = 0;
        long blockedQueries = 0;
        long queuedQueries = 0;

        long activeNodes = nodeManager.getNodes(NodeState.ACTIVE).size();
        if (!isIncludeCoordinator) {
            activeNodes -= 1;
        }

        long runningDrivers = 0;
        long runningTasks = 0;
        double memoryReservation = 0;

        long totalInputRows = dispatchManager.getStats().getConsumedInputRows().getTotalCount();
        long totalInputBytes = dispatchManager.getStats().getConsumedInputBytes().getTotalCount();
        long totalCpuTimeSecs = dispatchManager.getStats().getConsumedCpuTimeSecs().getTotalCount();

        for (BasicQueryInfo query : dispatchManager.getQueries()) {
            if (query.getState() == QueryState.QUEUED) {
                queuedQueries++;
            }
            else if (query.getState() == QueryState.RUNNING) {
                if (query.getQueryStats().isFullyBlocked()) {
                    blockedQueries++;
                }
                else {
                    runningQueries++;
                }
            }

            if (!query.getState().isDone()) {
                totalInputBytes += query.getQueryStats().getRawInputDataSize().toBytes();
                totalInputRows += query.getQueryStats().getRawInputPositions();
                totalCpuTimeSecs += query.getQueryStats().getTotalCpuTime().getValue(SECONDS);

                memoryReservation += query.getQueryStats().getUserMemoryReservation().toBytes();
                runningDrivers += query.getQueryStats().getRunningDrivers();
                runningTasks += query.getQueryStats().getRunningTasks();
            }
        }

        return new ClusterStats(
                runningQueries,
                blockedQueries,
                queuedQueries,
                activeNodes,
                runningDrivers,
                runningTasks,
                memoryReservation,
                totalInputRows,
                totalInputBytes,
                totalCpuTimeSecs,
                internalResourceGroupManager.getQueriesQueuedOnInternal());
    }

    @GET
    @Path("memory")
    public Response getClusterMemoryPoolInfo(@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto, @Context UriInfo uriInfo)
    {
        return Response.ok()
                .entity(clusterMemoryManager.getMemoryPoolInfo())
                .build();
    }

    @GET
    @Path("workerMemory")
    public Response getWorkerMemoryInfo(@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto, @Context UriInfo uriInfo)
    {
        return Response.ok()
                .entity(clusterMemoryManager.getWorkerMemoryInfo())
                .build();
    }

    @GET
    @Path("ttl")
    public Response getClusterTtl()
    {
        return Response.ok().entity(clusterTtlProviderManager.getClusterTtl()).build();
    }

    private void proxyClusterStats(HttpServletRequest servletRequest, AsyncResponse asyncResponse, String xForwardedProto, UriInfo uriInfo)
    {
        try {
            checkState(proxyHelper.isPresent());
            Iterator<InternalNode> resourceManagers = nodeManager.getResourceManagers().iterator();
            if (!resourceManagers.hasNext()) {
                asyncResponse.resume(Response.status(SERVICE_UNAVAILABLE).build());
                return;
            }
            InternalNode resourceManagerNode = resourceManagers.next();

            URI uri = uriInfo.getRequestUriBuilder()
                    .scheme(resourceManagerNode.getInternalUri().getScheme())
                    .host(resourceManagerNode.getHostAndPort().toInetAddress().getHostName())
                    .port(resourceManagerNode.getInternalUri().getPort())
                    .build();
            proxyHelper.get().performRequest(servletRequest, asyncResponse, uri);
        }
        catch (Exception e) {
            asyncResponse.resume(e);
        }
    }

    @ThriftStruct
    public static class ClusterStats
    {
        private final long runningQueries;
        private final long blockedQueries;
        private final long queuedQueries;

        private final long activeWorkers;
        private final long runningDrivers;
        private final long runningTasks;
        private final double reservedMemory;

        private final long totalInputRows;
        private final long totalInputBytes;
        private final long totalCpuTimeSecs;
        private final long adjustedQueueSize;

        @JsonCreator
        @ThriftConstructor
        public ClusterStats(
                @JsonProperty("runningQueries") long runningQueries,
                @JsonProperty("blockedQueries") long blockedQueries,
                @JsonProperty("queuedQueries") long queuedQueries,
                @JsonProperty("activeWorkers") long activeWorkers,
                @JsonProperty("runningDrivers") long runningDrivers,
                @JsonProperty("runningTasks") long runningTasks,
                @JsonProperty("reservedMemory") double reservedMemory,
                @JsonProperty("totalInputRows") long totalInputRows,
                @JsonProperty("totalInputBytes") long totalInputBytes,
                @JsonProperty("totalCpuTimeSecs") long totalCpuTimeSecs,
                @JsonProperty("adjustedQueueSize") long adjustedQueueSize)
        {
            this.runningQueries = runningQueries;
            this.blockedQueries = blockedQueries;
            this.queuedQueries = queuedQueries;
            this.activeWorkers = activeWorkers;
            this.runningDrivers = runningDrivers;
            this.runningTasks = runningTasks;
            this.reservedMemory = reservedMemory;
            this.totalInputRows = totalInputRows;
            this.totalInputBytes = totalInputBytes;
            this.totalCpuTimeSecs = totalCpuTimeSecs;
            this.adjustedQueueSize = adjustedQueueSize;
        }

        @JsonProperty
        @ThriftField(1)
        public long getRunningQueries()
        {
            return runningQueries;
        }

        @JsonProperty
        @ThriftField(2)
        public long getBlockedQueries()
        {
            return blockedQueries;
        }

        @JsonProperty
        @ThriftField(3)
        public long getQueuedQueries()
        {
            return queuedQueries;
        }

        @JsonProperty
        @ThriftField(4)
        public long getActiveWorkers()
        {
            return activeWorkers;
        }

        @JsonProperty
        @ThriftField(5)
        public long getRunningDrivers()
        {
            return runningDrivers;
        }

        @JsonProperty
        @ThriftField(6)
        public long getRunningTasks()
        {
            return runningTasks;
        }

        @JsonProperty
        @ThriftField(7)
        public double getReservedMemory()
        {
            return reservedMemory;
        }

        @JsonProperty
        @ThriftField(8)
        public long getTotalInputRows()
        {
            return totalInputRows;
        }

        @JsonProperty
        @ThriftField(9)
        public long getTotalInputBytes()
        {
            return totalInputBytes;
        }

        @JsonProperty
        @ThriftField(10)
        public long getTotalCpuTimeSecs()
        {
            return totalCpuTimeSecs;
        }

        @JsonProperty
        @ThriftField(11)
        public long getAdjustedQueueSize()
        {
            return adjustedQueueSize;
        }
    }
}
